/*
 * Copyright (2023) The Delta Lake Project Authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.delta.kernel.internal.actions;

import static io.delta.kernel.internal.util.InternalUtils.requireNonNull;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.stream.Collectors.toMap;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.Row;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.deletionvectors.Base85Codec;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.stream.IntStream;

/** Information about a deletion vector attached to a file action. */
public class DeletionVectorDescriptor {

  ////////////////////////////////////////////////////////////////////////////////
  // Static Fields / Methods
  ////////////////////////////////////////////////////////////////////////////////
  public static DeletionVectorDescriptor fromRow(Row row) {
    if (row == null) {
      return null;
    }

    final String storageType = requireNonNull(row, 0, "storageType").getString(0);
    final String pathOrInlineDv = requireNonNull(row, 1, "pathOrInlineDv").getString(1);
    final Optional<Integer> offset = Optional.ofNullable(row.isNullAt(2) ? null : row.getInt(2));
    final int sizeInBytes = requireNonNull(row, 3, "sizeInBytes").getInt(3);
    final long cardinality = requireNonNull(row, 4, "cardinality").getLong(4);

    return new DeletionVectorDescriptor(
        storageType, pathOrInlineDv, offset, sizeInBytes, cardinality);
  }

  public static DeletionVectorDescriptor fromColumnVector(ColumnVector vector, int rowId) {
    if (vector.isNullAt(rowId)) {
      return null;
    }

    final String storageType =
        requireNonNull(vector.getChild(0), rowId, "storageType").getString(rowId);
    final String pathOrInlineDv =
        requireNonNull(vector.getChild(1), rowId, "pathOrInlineDv").getString(rowId);
    final Optional<Integer> offset =
        Optional.ofNullable(
            vector.getChild(2).isNullAt(rowId) ? null : vector.getChild(2).getInt(rowId));
    final int sizeInBytes = requireNonNull(vector.getChild(3), rowId, "sizeInBytes").getInt(rowId);
    final long cardinality =
        requireNonNull(vector.getChild(4), rowId, "cardinality").getLong(rowId);
    return new DeletionVectorDescriptor(
        storageType, pathOrInlineDv, offset, sizeInBytes, cardinality);
  }

  // Markers to separate different kinds of DV storage.
  public static final String PATH_DV_MARKER = "p";
  public static final String INLINE_DV_MARKER = "i";
  public static final String UUID_DV_MARKER = "u";

  public static final StructType READ_SCHEMA =
      new StructType()
          .add("storageType", StringType.STRING, false /* nullable*/)
          .add("pathOrInlineDv", StringType.STRING, false /* nullable*/)
          .add("offset", IntegerType.INTEGER, true /* nullable*/)
          .add("sizeInBytes", IntegerType.INTEGER, false /* nullable*/)
          .add("cardinality", LongType.LONG, false /* nullable*/);

  private static final Map<String, Integer> COL_NAME_TO_ORDINAL =
      IntStream.range(0, READ_SCHEMA.length())
          .boxed()
          .collect(toMap(i -> READ_SCHEMA.at(i).getName(), i -> i));

  /** String that is used in all file names generated by deletion vector store */
  private static final String DELETION_VECTOR_FILE_NAME_CORE = "deletion_vector";

  ////////////////////////////////////////////////////////////////////////////////
  // Instance Fields / Methods
  ////////////////////////////////////////////////////////////////////////////////

  /** Indicates how the DV is stored. Should be a single letter (see [[pathOrInlineDv]] below.) */
  private final String storageType;

  /**
   * Contains the actual data that allows accessing the DV.
   *
   * <p>Three options are currently supported: - `storageType="u"` format: `<random prefix -
   * optional><base85 encoded uuid>` The deletion vector is stored in a file with a path relative to
   * the data directory of this Delta Table, and the file name can be reconstructed from the UUID.
   * The encoded UUID is always exactly 20 characters, so the random prefix length can be determined
   * any characters exceeding 20. - `storageType="i"` format: `<base85 encoded bytes>` The deletion
   * vector is stored inline in the log. - `storageType="p"` format: `<absolute path>` The DV is
   * stored in a file with an absolute path given by this url.
   */
  private final String pathOrInlineDv;

  /**
   * Start of the data for this DV in number of bytes from the beginning of the file it is stored
   * in.
   *
   * <p>Always None when storageType = "i".
   */
  private final Optional<Integer> offset;

  /** Size of the serialized DV in bytes (raw data size, i.e. before base85 encoding). */
  private final int sizeInBytes;

  /** Number of rows the DV logically removes from the file. */
  private final long cardinality;

  public DeletionVectorDescriptor(
      String storageType,
      String pathOrInlineDv,
      Optional<Integer> offset,
      int sizeInBytes,
      long cardinality) {
    this.storageType = storageType;
    this.pathOrInlineDv = pathOrInlineDv;
    this.offset = offset;
    this.sizeInBytes = sizeInBytes;
    this.cardinality = cardinality;
  }

  public String getStorageType() {
    return storageType;
  }

  public String getPathOrInlineDv() {
    return pathOrInlineDv;
  }

  public Optional<Integer> getOffset() {
    return offset;
  }

  public int getSizeInBytes() {
    return sizeInBytes;
  }

  public long getCardinality() {
    return cardinality;
  }

  public String getUniqueId() {
    String uniqueFileId = storageType + pathOrInlineDv;
    if (offset.isPresent()) {
      return uniqueFileId + "@" + offset;
    } else {
      return uniqueFileId;
    }
  }

  public boolean isInline() {
    return storageType == INLINE_DV_MARKER;
  }

  public boolean isOnDisk() {
    return !isInline();
  }

  public byte[] inlineData() {
    checkArgument(isInline(), "Can't get data for an on-disk DV from the log.");
    // The sizeInBytes is used to remove any padding that might have been added during encoding.
    return Base85Codec.decodeBytes(pathOrInlineDv, sizeInBytes);
  }

  public String getAbsolutePath(String tableLocation) {
    checkArgument(isOnDisk(), "Can't get a path for an inline deletion vector");
    if (storageType.equals(UUID_DV_MARKER)) {
      // If the file was written with a random prefix, we have to extract that,
      // before decoding the UUID.
      int randomPrefixLength = pathOrInlineDv.length() - Base85Codec.ENCODED_UUID_LENGTH;
      String randomPrefix = pathOrInlineDv.substring(0, randomPrefixLength);
      String encodedUuid = pathOrInlineDv.substring(randomPrefixLength);
      UUID uuid = Base85Codec.decodeUUID(encodedUuid);
      return assembleDeletionVectorPath(tableLocation, uuid, randomPrefix).toString();
    } else if (storageType.equals(PATH_DV_MARKER)) {
      // Since there is no need for legacy support for relative paths for DVs,
      // relative DVs should *always* use the UUID variant.
      try {
        URI parsedUri = new URI(pathOrInlineDv);
        checkArgument(parsedUri.isAbsolute(), "Relative URIs are not supported for DVs");
        return new Path(parsedUri).toString();
      } catch (URISyntaxException e) {
        throw new RuntimeException("Couldn't parse uri:\n" + e);
      }
    } else {
      throw new RuntimeException(
          "A uri "
              + pathOrInlineDv
              + " which cannot be turned into a relative path as found in the transaction log");
    }
  }

  /**
   * Return the unique path under `parentPath` that is based on `id`.
   *
   * <p>Optionally, prepend a `prefix` to the name.
   */
  private Path assembleDeletionVectorPath(String targetParentPath, UUID id, String prefix) {
    String fileName = String.format("%s_%s.bin", DELETION_VECTOR_FILE_NAME_CORE, id.toString());
    if (prefix.length() > 0) {
      return new Path(new Path(targetParentPath, prefix), fileName);
    } else {
      return new Path(targetParentPath, fileName);
    }
  }

  @Override
  public String toString() {
    return String.format(
        "DeletionVectorDescriptor(storageType=%s, pathOrInlineDv=%s, offset=%s, "
            + "sizeInBytes=%s, cardinality=%s)",
        storageType, pathOrInlineDv, offset, sizeInBytes, cardinality);
  }

  /** @return Row representation of this deletion vector descriptor */
  public Row toRow() {
    Map<Integer, Object> fieldMap = new HashMap<>();

    fieldMap.put(COL_NAME_TO_ORDINAL.get("storageType"), storageType);
    fieldMap.put(COL_NAME_TO_ORDINAL.get("pathOrInlineDv"), pathOrInlineDv);

    // Only add offset if it's present
    if (offset.isPresent()) {
      fieldMap.put(COL_NAME_TO_ORDINAL.get("offset"), offset.get());
    }
    // If offset is not present, the field remains null in the map

    fieldMap.put(COL_NAME_TO_ORDINAL.get("sizeInBytes"), sizeInBytes);
    fieldMap.put(COL_NAME_TO_ORDINAL.get("cardinality"), cardinality);

    return new GenericRow(READ_SCHEMA, fieldMap);
  }

  @Override
  public boolean equals(Object o) {
    if (o == this) {
      return true;
    }
    if (!(o instanceof DeletionVectorDescriptor)) {
      return false;
    }
    DeletionVectorDescriptor dv = (DeletionVectorDescriptor) o;
    return Objects.equals(storageType, dv.storageType)
        && Objects.equals(pathOrInlineDv, dv.pathOrInlineDv)
        && Objects.equals(offset, dv.offset)
        && this.sizeInBytes == dv.sizeInBytes
        && this.cardinality == dv.cardinality;
  }

  @Override
  public int hashCode() {
    return Objects.hash(storageType, pathOrInlineDv, offset, sizeInBytes, cardinality);
  }
}
